package com.process;

import canal.util.ip.IPSeeker;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.base.MQBaseETL;
import com.bean.ClickLogEntity;
import com.bean.ClickLogWideEntity;
import com.utils.*;
import nl.basjes.parse.httpdlog.HttpdLoglineParser;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.io.File;

/**
 * @Description: 点击流日志的实时ETL操作
 * * 需要将点击流日志对象转换成拓宽后的点击流对象，增加省份、城市、时间字段
 * * @param env
 * @Author: Sky
 * @Times : 2021/8/12 20:57
 */
public class ClickLogDataETL {
    public static void main(String[] args) throws Exception {

        ClickLogDataETL clickLogDataETL = new ClickLogDataETL();

        clickLogDataETL.process();


    }

    public void process() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        MQBaseETL baseEtl = new MQBaseETL();
        DataStream<String> clickLogDataStream = baseEtl.KafkaConsummer(ConfigReader.input_topic_click_log, env);

        etl(clickLogDataStream);

        env.execute("aaa");
    }


    /**
     * 将点击流日志字符串转换成拉宽后的点击流对象
     *
     * @param clickLogDataStream
     */
    public void etl(DataStream<String> clickLogDataStream) {
        /**
         * 实现步骤：
         * 1：将点击流日志字符串转换成点击流对象，使用logparsing解析
         * 2：根据ip地址，获取到ip地址对应的省份、城市、访问时间等信息，需要一个ip地址库，传递ip地址返回对应的省份和城市信息
         * 3：将获取到的省份和城市作为拉宽后对象的参数传递进去，将拉宽后的点击流对象返回
         */
        //1：将点击流日志字符串转换成点击流对象，使用logparsing解析
        DataStream<ClickLogEntity> clickLogEntityDataStream = clickLogDataStream.map(new RichMapFunction<String, ClickLogEntity>() {
            //定义数据格式化的解析器对象
            public HttpdLoglineParser<ClickLogEntity> praser;

            //这个方法只被初始化一次，一般用于初始化外部资源
            @Override
            public void open(Configuration parameters) throws Exception {
                praser = ClickLogEntity.createClickLogParse();
            }

            @Override
            public ClickLogEntity map(String value) throws Exception {
                ClickLogEntity clickLog = new ClickLogEntity();
                return praser.parse(clickLog, value);
            }


        });
        System.out.println("转换成bean对象=====================");
        clickLogEntityDataStream.print();

        //2：根据ip地址，获取到ip地址对应的省份、城市、访问时间等信息，需要一个ip地址库，传递ip地址返回对应的省份和城市信息
        DataStream<ClickLogWideEntity> clickLogWideDataStream = clickLogEntityDataStream.map(

        new RichMapFunction<ClickLogEntity, ClickLogWideEntity>() {

            private IPSeeker ipSeeker;

            @Override
            public void open(Configuration parameters) throws Exception {
//                File file = getRuntimeContext().getDistributedCache().getFile("qqwry.dat");

                ipSeeker = new IPSeeker("qqwry.dat","c:\\IP");
            }


            @Override
            public ClickLogWideEntity map(ClickLogEntity clickLogEntity) throws Exception {
                ClickLogWideEntity clickLogWideEntry = ClickLogWideEntity.getClickLogWideEntry(clickLogEntity);
                //根据ip地址获取省份、城市信息
                String country = ipSeeker.getCountry(clickLogWideEntry.getIp());
                //江苏省常州市
                String[] areaArray = country.split("省");
                if (areaArray.length > 1) {
                    clickLogWideEntry.setProvince(areaArray[0] + "省");
                    clickLogWideEntry.setCity(areaArray[1]);
                } else {
                    //表示直辖市
                    //上海市闵行区
                    areaArray = country.split("市");
                    if (areaArray.length > 1) {
                        clickLogWideEntry.setProvince(areaArray[0] + "市");
                        clickLogWideEntry.setCity(areaArray[1]);
                    } else {
                        clickLogWideEntry.setProvince(areaArray[0]);
                        clickLogWideEntry.setCity("");
                    }
                }
                clickLogWideEntry.setRequestTime(clickLogEntity.getRequestTime());
                return clickLogWideEntry;
            }
        });

        System.out.println("转换成宽表bean对象=====================");
        clickLogWideDataStream.print();

        //输出到Kafka
        //将cartWideBeanDS转换成json字符串返回，因为kafka中需要传入字符串类型的数据。
        DataStream<String> clickLogStreams = clickLogWideDataStream.map(new MapFunction<ClickLogWideEntity, String>() {
            @Override
            public String map(ClickLogWideEntity clickLogWideEntity) throws Exception {
                //禁止循环引用检测
                return JSON.toJSONString(clickLogWideEntity, SerializerFeature.DisableCircularReferenceDetect);
            }
        });

        //将数据保存在KAFKA
        clickLogStreams.addSink(new KafkaMySink());

    }


}
